#include <xrpld/app/ledger/AccountStateSF.h>
#include <xrpld/app/ledger/InboundLedger.h>
#include <xrpld/app/ledger/InboundLedgers.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/ledger/TransactionStateSF.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/overlay/Overlay.h>

#include <xrpl/basics/Log.h>
#include <xrpl/protocol/HashPrefix.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/shamap/SHAMapNodeID.h>

#include <boost/iterator/function_output_iterator.hpp>

#include <algorithm>
#include <random>

namespace ripple {

using namespace std::chrono_literals;

enum {
    // Number of peers to start with
    peerCountStart = 5

    // Number of peers to add on a timeout
    ,
    peerCountAdd = 3

    // how many timeouts before we give up
    ,
    ledgerTimeoutRetriesMax = 6

    // how many timeouts before we get aggressive
    ,
    ledgerBecomeAggressiveThreshold = 4

    // Number of nodes to find initially
    ,
    missingNodesFind = 256

    // Number of nodes to request for a reply
    ,
    reqNodesReply = 128

    // Number of nodes to request blindly
    ,
    reqNodes = 12
};

// millisecond for each ledger timeout
auto constexpr ledgerAcquireTimeout = 3000ms;

InboundLedger::InboundLedger(
    Application& app,
    uint256 const& hash,
    std::uint32_t seq,
    Reason reason,
    clock_type& clock,
    std::unique_ptr<PeerSet> peerSet)
    : TimeoutCounter(
          app,
          hash,
          ledgerAcquireTimeout,
          {jtLEDGER_DATA, "InboundLedger", 5},
          app.journal("InboundLedger"))
    , m_clock(clock)
    , mHaveHeader(false)
    , mHaveState(false)
    , mHaveTransactions(false)
    , mSignaled(false)
    , mByHash(true)
    , mSeq(seq)
    , mReason(reason)
    , mReceiveDispatched(false)
    , mPeerSet(std::move(peerSet))
{
    JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
    touch();
}

void
InboundLedger::init(ScopedLockType& collectionLock)
{
    ScopedLockType sl(mtx_);
    collectionLock.unlock();

    tryDB(app_.getNodeFamily().db());
    if (failed_)
        return;

    if (!complete_)
    {
        addPeers();
        queueJob(sl);
        return;
    }

    JLOG(journal_.debug()) << "Acquiring ledger we already have in "
                           << " local store. " << hash_;
    XRPL_ASSERT(
        mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
            mLedger->read(keylet::fees()),
        "ripple::InboundLedger::init : valid ledger fees");
    mLedger->setImmutable();

    if (mReason == Reason::HISTORY)
        return;

    app_.getLedgerMaster().storeLedger(mLedger);

    // Check if this could be a newer fully-validated ledger
    if (mReason == Reason::CONSENSUS)
        app_.getLedgerMaster().checkAccept(mLedger);
}

std::size_t
InboundLedger::getPeerCount() const
{
    auto const& peerIds = mPeerSet->getPeerIds();
    return std::count_if(peerIds.begin(), peerIds.end(), [this](auto id) {
        return (app_.overlay().findPeerByShortID(id) != nullptr);
    });
}

void
InboundLedger::update(std::uint32_t seq)
{
    ScopedLockType sl(mtx_);

    // If we didn't know the sequence number, but now do, save it
    if ((seq != 0) && (mSeq == 0))
        mSeq = seq;

    // Prevent this from being swept
    touch();
}

bool
InboundLedger::checkLocal()
{
    ScopedLockType sl(mtx_);
    if (!isDone())
    {
        if (mLedger)
            tryDB(mLedger->stateMap().family().db());
        else
            tryDB(app_.getNodeFamily().db());
        if (failed_ || complete_)
        {
            done();
            return true;
        }
    }
    return false;
}

InboundLedger::~InboundLedger()
{
    // Save any received AS data not processed. It could be useful
    // for populating a different ledger
    for (auto& entry : mReceivedData)
    {
        if (entry.second->type() == protocol::liAS_NODE)
            app_.getInboundLedgers().gotStaleData(entry.second);
    }
    if (!isDone())
    {
        JLOG(journal_.debug())
            << "Acquire " << hash_ << " abort "
            << ((timeouts_ == 0) ? std::string()
                                 : (std::string("timeouts:") +
                                    std::to_string(timeouts_) + " "))
            << mStats.get();
    }
}

static std::vector<uint256>
neededHashes(
    uint256 const& root,
    SHAMap& map,
    int max,
    SHAMapSyncFilter* filter)
{
    std::vector<uint256> ret;

    if (!root.isZero())
    {
        if (map.getHash().isZero())
            ret.push_back(root);
        else
        {
            auto mn = map.getMissingNodes(max, filter);
            ret.reserve(mn.size());
            for (auto const& n : mn)
                ret.push_back(n.second);
        }
    }

    return ret;
}

std::vector<uint256>
InboundLedger::neededTxHashes(int max, SHAMapSyncFilter* filter) const
{
    return neededHashes(mLedger->info().txHash, mLedger->txMap(), max, filter);
}

std::vector<uint256>
InboundLedger::neededStateHashes(int max, SHAMapSyncFilter* filter) const
{
    return neededHashes(
        mLedger->info().accountHash, mLedger->stateMap(), max, filter);
}

// See how much of the ledger data is stored locally
// Data found in a fetch pack will be stored
void
InboundLedger::tryDB(NodeStore::Database& srcDB)
{
    if (!mHaveHeader)
    {
        auto makeLedger = [&, this](Blob const& data) {
            JLOG(journal_.trace()) << "Ledger header found in fetch pack";
            mLedger = std::make_shared<Ledger>(
                deserializePrefixedHeader(makeSlice(data)),
                app_.config(),
                app_.getNodeFamily());
            if (mLedger->info().hash != hash_ ||
                (mSeq != 0 && mSeq != mLedger->info().seq))
            {
                // We know for a fact the ledger can never be acquired
                JLOG(journal_.warn())
                    << "hash " << hash_ << " seq " << std::to_string(mSeq)
                    << " cannot be a ledger";
                mLedger.reset();
                failed_ = true;
            }
        };

        // Try to fetch the ledger header from the DB
        if (auto nodeObject = srcDB.fetchNodeObject(hash_, mSeq))
        {
            JLOG(journal_.trace()) << "Ledger header found in local store";

            makeLedger(nodeObject->getData());
            if (failed_)
                return;

            // Store the ledger header if the source and destination differ
            auto& dstDB{mLedger->stateMap().family().db()};
            if (std::addressof(dstDB) != std::addressof(srcDB))
            {
                Blob blob{nodeObject->getData()};
                dstDB.store(
                    hotLEDGER, std::move(blob), hash_, mLedger->info().seq);
            }
        }
        else
        {
            // Try to fetch the ledger header from a fetch pack
            auto data = app_.getLedgerMaster().getFetchPack(hash_);
            if (!data)
                return;

            JLOG(journal_.trace()) << "Ledger header found in fetch pack";

            makeLedger(*data);
            if (failed_)
                return;

            // Store the ledger header in the ledger's database
            mLedger->stateMap().family().db().store(
                hotLEDGER, std::move(*data), hash_, mLedger->info().seq);
        }

        if (mSeq == 0)
            mSeq = mLedger->info().seq;
        mLedger->stateMap().setLedgerSeq(mSeq);
        mLedger->txMap().setLedgerSeq(mSeq);
        mHaveHeader = true;
    }

    if (!mHaveTransactions)
    {
        if (mLedger->info().txHash.isZero())
        {
            JLOG(journal_.trace()) << "No TXNs to fetch";
            mHaveTransactions = true;
        }
        else
        {
            TransactionStateSF filter(
                mLedger->txMap().family().db(), app_.getLedgerMaster());
            if (mLedger->txMap().fetchRoot(
                    SHAMapHash{mLedger->info().txHash}, &filter))
            {
                if (neededTxHashes(1, &filter).empty())
                {
                    JLOG(journal_.trace()) << "Had full txn map locally";
                    mHaveTransactions = true;
                }
            }
        }
    }

    if (!mHaveState)
    {
        if (mLedger->info().accountHash.isZero())
        {
            JLOG(journal_.fatal())
                << "We are acquiring a ledger with a zero account hash";
            failed_ = true;
            return;
        }
        AccountStateSF filter(
            mLedger->stateMap().family().db(), app_.getLedgerMaster());
        if (mLedger->stateMap().fetchRoot(
                SHAMapHash{mLedger->info().accountHash}, &filter))
        {
            if (neededStateHashes(1, &filter).empty())
            {
                JLOG(journal_.trace()) << "Had full AS map locally";
                mHaveState = true;
            }
        }
    }

    if (mHaveTransactions && mHaveState)
    {
        JLOG(journal_.debug()) << "Had everything locally";
        complete_ = true;
        XRPL_ASSERT(
            mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
                mLedger->read(keylet::fees()),
            "ripple::InboundLedger::tryDB : valid ledger fees");
        mLedger->setImmutable();
    }
}

/** Called with a lock by the PeerSet when the timer expires
 */
void
InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
{
    mRecentNodes.clear();

    if (isDone())
    {
        JLOG(journal_.info()) << "Already done " << hash_;
        return;
    }

    if (timeouts_ > ledgerTimeoutRetriesMax)
    {
        if (mSeq != 0)
        {
            JLOG(journal_.warn())
                << timeouts_ << " timeouts for ledger " << mSeq;
        }
        else
        {
            JLOG(journal_.warn())
                << timeouts_ << " timeouts for ledger " << hash_;
        }
        failed_ = true;
        done();
        return;
    }

    if (!wasProgress)
    {
        checkLocal();

        mByHash = true;

        std::size_t pc = getPeerCount();
        JLOG(journal_.debug())
            << "No progress(" << pc << ") for ledger " << hash_;

        // addPeers triggers if the reason is not HISTORY
        // So if the reason IS HISTORY, need to trigger after we add
        // otherwise, we need to trigger before we add
        // so each peer gets triggered once
        if (mReason != Reason::HISTORY)
            trigger(nullptr, TriggerReason::timeout);
        addPeers();
        if (mReason == Reason::HISTORY)
            trigger(nullptr, TriggerReason::timeout);
    }
}

/** Add more peers to the set, if possible */
void
InboundLedger::addPeers()
{
    mPeerSet->addPeers(
        (getPeerCount() == 0) ? peerCountStart : peerCountAdd,
        [this](auto peer) { return peer->hasLedger(hash_, mSeq); },
        [this](auto peer) {
            // For historical nodes, do not trigger too soon
            // since a fetch pack is probably coming
            if (mReason != Reason::HISTORY)
                trigger(peer, TriggerReason::added);
        });
}

std::weak_ptr<TimeoutCounter>
InboundLedger::pmDowncast()
{
    return shared_from_this();
}

void
InboundLedger::done()
{
    if (mSignaled)
        return;

    mSignaled = true;
    touch();

    JLOG(journal_.debug()) << "Acquire " << hash_ << (failed_ ? " fail " : " ")
                           << ((timeouts_ == 0)
                                   ? std::string()
                                   : (std::string("timeouts:") +
                                      std::to_string(timeouts_) + " "))
                           << mStats.get();

    XRPL_ASSERT(
        complete_ || failed_,
        "ripple::InboundLedger::done : complete or failed");

    if (complete_ && !failed_ && mLedger)
    {
        XRPL_ASSERT(
            mLedger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
                mLedger->read(keylet::fees()),
            "ripple::InboundLedger::done : valid ledger fees");
        mLedger->setImmutable();
        switch (mReason)
        {
            case Reason::HISTORY:
                app_.getInboundLedgers().onLedgerFetched();
                break;
            default:
                app_.getLedgerMaster().storeLedger(mLedger);
                break;
        }
    }

    // We hold the PeerSet lock, so must dispatch
    app_.getJobQueue().addJob(
        jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
            if (self->complete_ && !self->failed_)
            {
                self->app_.getLedgerMaster().checkAccept(self->getLedger());
                self->app_.getLedgerMaster().tryAdvance();
            }
            else
                self->app_.getInboundLedgers().logFailure(
                    self->hash_, self->mSeq);
        });
}

/** Request more nodes, perhaps from a specific peer
 */
void
InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
{
    ScopedLockType sl(mtx_);

    if (isDone())
    {
        JLOG(journal_.debug())
            << "Trigger on ledger: " << hash_ << (complete_ ? " completed" : "")
            << (failed_ ? " failed" : "");
        return;
    }

    if (auto stream = journal_.debug())
    {
        std::stringstream ss;
        ss << "Trigger acquiring ledger " << hash_;
        if (peer)
            ss << " from " << peer;

        if (complete_ || failed_)
            ss << " complete=" << complete_ << " failed=" << failed_;
        else
            ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
               << " as=" << mHaveState;
        stream << ss.str();
    }

    if (!mHaveHeader)
    {
        tryDB(app_.getNodeFamily().db());
        if (failed_)
        {
            JLOG(journal_.warn()) << " failed local for " << hash_;
            return;
        }
    }

    protocol::TMGetLedger tmGL;
    tmGL.set_ledgerhash(hash_.begin(), hash_.size());

    if (timeouts_ != 0)
    {
        // Be more aggressive if we've timed out at least once
        tmGL.set_querytype(protocol::qtINDIRECT);

        if (!progress_ && !failed_ && mByHash &&
            (timeouts_ > ledgerBecomeAggressiveThreshold))
        {
            auto need = getNeededHashes();

            if (!need.empty())
            {
                protocol::TMGetObjectByHash tmBH;
                bool typeSet = false;
                tmBH.set_query(true);
                tmBH.set_ledgerhash(hash_.begin(), hash_.size());
                for (auto const& p : need)
                {
                    JLOG(journal_.debug()) << "Want: " << p.second;

                    if (!typeSet)
                    {
                        tmBH.set_type(p.first);
                        typeSet = true;
                    }

                    if (p.first == tmBH.type())
                    {
                        protocol::TMIndexedObject* io = tmBH.add_objects();
                        io->set_hash(p.second.begin(), p.second.size());
                        if (mSeq != 0)
                            io->set_ledgerseq(mSeq);
                    }
                }

                auto packet =
                    std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
                auto const& peerIds = mPeerSet->getPeerIds();
                std::for_each(
                    peerIds.begin(), peerIds.end(), [this, &packet](auto id) {
                        if (auto p = app_.overlay().findPeerByShortID(id))
                        {
                            mByHash = false;
                            p->send(packet);
                        }
                    });
            }
            else
            {
                JLOG(journal_.info())
                    << "getNeededHashes says acquire is complete";
                mHaveHeader = true;
                mHaveTransactions = true;
                mHaveState = true;
                complete_ = true;
            }
        }
    }

    // We can't do much without the header data because we don't know the
    // state or transaction root hashes.
    if (!mHaveHeader && !failed_)
    {
        tmGL.set_itype(protocol::liBASE);
        if (mSeq != 0)
            tmGL.set_ledgerseq(mSeq);
        JLOG(journal_.trace()) << "Sending header request to "
                               << (peer ? "selected peer" : "all peers");
        mPeerSet->sendRequest(tmGL, peer);
        return;
    }

    if (mLedger)
        tmGL.set_ledgerseq(mLedger->info().seq);

    if (reason != TriggerReason::reply)
    {
        // If we're querying blind, don't query deep
        tmGL.set_querydepth(0);
    }
    else if (peer && peer->isHighLatency())
    {
        // If the peer has high latency, query extra deep
        tmGL.set_querydepth(2);
    }
    else
        tmGL.set_querydepth(1);

    // Get the state data first because it's the most likely to be useful
    // if we wind up abandoning this fetch.
    if (mHaveHeader && !mHaveState && !failed_)
    {
        XRPL_ASSERT(
            mLedger,
            "ripple::InboundLedger::trigger : non-null ledger to read state "
            "from");

        if (!mLedger->stateMap().isValid())
        {
            failed_ = true;
        }
        else if (mLedger->stateMap().getHash().isZero())
        {
            // we need the root node
            tmGL.set_itype(protocol::liAS_NODE);
            *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
            JLOG(journal_.trace()) << "Sending AS root request to "
                                   << (peer ? "selected peer" : "all peers");
            mPeerSet->sendRequest(tmGL, peer);
            return;
        }
        else
        {
            AccountStateSF filter(
                mLedger->stateMap().family().db(), app_.getLedgerMaster());

            // Release the lock while we process the large state map
            sl.unlock();
            auto nodes =
                mLedger->stateMap().getMissingNodes(missingNodesFind, &filter);
            sl.lock();

            // Make sure nothing happened while we released the lock
            if (!failed_ && !complete_ && !mHaveState)
            {
                if (nodes.empty())
                {
                    if (!mLedger->stateMap().isValid())
                        failed_ = true;
                    else
                    {
                        mHaveState = true;

                        if (mHaveTransactions)
                            complete_ = true;
                    }
                }
                else
                {
                    filterNodes(nodes, reason);

                    if (!nodes.empty())
                    {
                        tmGL.set_itype(protocol::liAS_NODE);
                        for (auto const& id : nodes)
                        {
                            *(tmGL.add_nodeids()) = id.first.getRawString();
                        }

                        JLOG(journal_.trace())
                            << "Sending AS node request (" << nodes.size()
                            << ") to "
                            << (peer ? "selected peer" : "all peers");
                        mPeerSet->sendRequest(tmGL, peer);
                        return;
                    }
                    else
                    {
                        JLOG(journal_.trace()) << "All AS nodes filtered";
                    }
                }
            }
        }
    }

    if (mHaveHeader && !mHaveTransactions && !failed_)
    {
        XRPL_ASSERT(
            mLedger,
            "ripple::InboundLedger::trigger : non-null ledger to read "
            "transactions from");

        if (!mLedger->txMap().isValid())
        {
            failed_ = true;
        }
        else if (mLedger->txMap().getHash().isZero())
        {
            // we need the root node
            tmGL.set_itype(protocol::liTX_NODE);
            *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
            JLOG(journal_.trace()) << "Sending TX root request to "
                                   << (peer ? "selected peer" : "all peers");
            mPeerSet->sendRequest(tmGL, peer);
            return;
        }
        else
        {
            TransactionStateSF filter(
                mLedger->txMap().family().db(), app_.getLedgerMaster());

            auto nodes =
                mLedger->txMap().getMissingNodes(missingNodesFind, &filter);

            if (nodes.empty())
            {
                if (!mLedger->txMap().isValid())
                    failed_ = true;
                else
                {
                    mHaveTransactions = true;

                    if (mHaveState)
                        complete_ = true;
                }
            }
            else
            {
                filterNodes(nodes, reason);

                if (!nodes.empty())
                {
                    tmGL.set_itype(protocol::liTX_NODE);
                    for (auto const& n : nodes)
                    {
                        *(tmGL.add_nodeids()) = n.first.getRawString();
                    }
                    JLOG(journal_.trace())
                        << "Sending TX node request (" << nodes.size()
                        << ") to " << (peer ? "selected peer" : "all peers");
                    mPeerSet->sendRequest(tmGL, peer);
                    return;
                }
                else
                {
                    JLOG(journal_.trace()) << "All TX nodes filtered";
                }
            }
        }
    }

    if (complete_ || failed_)
    {
        JLOG(journal_.debug())
            << "Done:" << (complete_ ? " complete" : "")
            << (failed_ ? " failed " : " ") << mLedger->info().seq;
        sl.unlock();
        done();
    }
}

void
InboundLedger::filterNodes(
    std::vector<std::pair<SHAMapNodeID, uint256>>& nodes,
    TriggerReason reason)
{
    // Sort nodes so that the ones we haven't recently
    // requested come before the ones we have.
    auto dup = std::stable_partition(
        nodes.begin(), nodes.end(), [this](auto const& item) {
            return mRecentNodes.count(item.second) == 0;
        });

    // If everything is a duplicate we don't want to send
    // any query at all except on a timeout where we need
    // to query everyone:
    if (dup == nodes.begin())
    {
        JLOG(journal_.trace()) << "filterNodes: all duplicates";

        if (reason != TriggerReason::timeout)
        {
            nodes.clear();
            return;
        }
    }
    else
    {
        JLOG(journal_.trace()) << "filterNodes: pruning duplicates";

        nodes.erase(dup, nodes.end());
    }

    std::size_t const limit =
        (reason == TriggerReason::reply) ? reqNodesReply : reqNodes;

    if (nodes.size() > limit)
        nodes.resize(limit);

    for (auto const& n : nodes)
        mRecentNodes.insert(n.second);
}

/** Take ledger header data
    Call with a lock
*/
// data must not have hash prefix
bool
InboundLedger::takeHeader(std::string const& data)
{
    // Return value: true=normal, false=bad data
    JLOG(journal_.trace()) << "got header acquiring ledger " << hash_;

    if (complete_ || failed_ || mHaveHeader)
        return true;

    auto* f = &app_.getNodeFamily();
    mLedger = std::make_shared<Ledger>(
        deserializeHeader(makeSlice(data)), app_.config(), *f);
    if (mLedger->info().hash != hash_ ||
        (mSeq != 0 && mSeq != mLedger->info().seq))
    {
        JLOG(journal_.warn())
            << "Acquire hash mismatch: " << mLedger->info().hash
            << "!=" << hash_;
        mLedger.reset();
        return false;
    }
    if (mSeq == 0)
        mSeq = mLedger->info().seq;
    mLedger->stateMap().setLedgerSeq(mSeq);
    mLedger->txMap().setLedgerSeq(mSeq);
    mHaveHeader = true;

    Serializer s(data.size() + 4);
    s.add32(HashPrefix::ledgerMaster);
    s.addRaw(data.data(), data.size());
    f->db().store(hotLEDGER, std::move(s.modData()), hash_, mSeq);

    if (mLedger->info().txHash.isZero())
        mHaveTransactions = true;

    if (mLedger->info().accountHash.isZero())
        mHaveState = true;

    mLedger->txMap().setSynching();
    mLedger->stateMap().setSynching();

    return true;
}

/** Process node data received from a peer
    Call with a lock
*/
void
InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
{
    if (!mHaveHeader)
    {
        JLOG(journal_.warn()) << "Missing ledger header";
        san.incInvalid();
        return;
    }
    if (packet.type() == protocol::liTX_NODE)
    {
        if (mHaveTransactions || failed_)
        {
            san.incDuplicate();
            return;
        }
    }
    else if (mHaveState || failed_)
    {
        san.incDuplicate();
        return;
    }

    auto [map, rootHash, filter] = [&]()
        -> std::tuple<SHAMap&, SHAMapHash, std::unique_ptr<SHAMapSyncFilter>> {
        if (packet.type() == protocol::liTX_NODE)
            return {
                mLedger->txMap(),
                SHAMapHash{mLedger->info().txHash},
                std::make_unique<TransactionStateSF>(
                    mLedger->txMap().family().db(), app_.getLedgerMaster())};
        return {
            mLedger->stateMap(),
            SHAMapHash{mLedger->info().accountHash},
            std::make_unique<AccountStateSF>(
                mLedger->stateMap().family().db(), app_.getLedgerMaster())};
    }();

    try
    {
        auto const f = filter.get();

        for (auto const& node : packet.nodes())
        {
            auto const nodeID = deserializeSHAMapNodeID(node.nodeid());

            if (!nodeID)
                throw std::runtime_error("data does not properly deserialize");

            if (nodeID->isRoot())
            {
                san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
            }
            else
            {
                san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
            }

            if (!san.isGood())
            {
                JLOG(journal_.warn()) << "Received bad node data";
                return;
            }
        }
    }
    catch (std::exception const& e)
    {
        JLOG(journal_.error()) << "Received bad node data: " << e.what();
        san.incInvalid();
        return;
    }

    if (!map.isSynching())
    {
        if (packet.type() == protocol::liTX_NODE)
            mHaveTransactions = true;
        else
            mHaveState = true;

        if (mHaveTransactions && mHaveState)
        {
            complete_ = true;
            done();
        }
    }
}

/** Process AS root node received from a peer
    Call with a lock
*/
bool
InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
{
    if (failed_ || mHaveState)
    {
        san.incDuplicate();
        return true;
    }

    if (!mHaveHeader)
    {
        // LCOV_EXCL_START
        UNREACHABLE("ripple::InboundLedger::takeAsRootNode : no ledger header");
        return false;
        // LCOV_EXCL_STOP
    }

    AccountStateSF filter(
        mLedger->stateMap().family().db(), app_.getLedgerMaster());
    san += mLedger->stateMap().addRootNode(
        SHAMapHash{mLedger->info().accountHash}, data, &filter);
    return san.isGood();
}

/** Process AS root node received from a peer
    Call with a lock
*/
bool
InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
{
    if (failed_ || mHaveTransactions)
    {
        san.incDuplicate();
        return true;
    }

    if (!mHaveHeader)
    {
        // LCOV_EXCL_START
        UNREACHABLE("ripple::InboundLedger::takeTxRootNode : no ledger header");
        return false;
        // LCOV_EXCL_STOP
    }

    TransactionStateSF filter(
        mLedger->txMap().family().db(), app_.getLedgerMaster());
    san += mLedger->txMap().addRootNode(
        SHAMapHash{mLedger->info().txHash}, data, &filter);
    return san.isGood();
}

std::vector<InboundLedger::neededHash_t>
InboundLedger::getNeededHashes()
{
    std::vector<neededHash_t> ret;

    if (!mHaveHeader)
    {
        ret.push_back(
            std::make_pair(protocol::TMGetObjectByHash::otLEDGER, hash_));
        return ret;
    }

    if (!mHaveState)
    {
        AccountStateSF filter(
            mLedger->stateMap().family().db(), app_.getLedgerMaster());
        for (auto const& h : neededStateHashes(4, &filter))
        {
            ret.push_back(
                std::make_pair(protocol::TMGetObjectByHash::otSTATE_NODE, h));
        }
    }

    if (!mHaveTransactions)
    {
        TransactionStateSF filter(
            mLedger->txMap().family().db(), app_.getLedgerMaster());
        for (auto const& h : neededTxHashes(4, &filter))
        {
            ret.push_back(std::make_pair(
                protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
        }
    }

    return ret;
}

/** Stash a TMLedgerData received from a peer for later processing
    Returns 'true' if we need to dispatch
*/
bool
InboundLedger::gotData(
    std::weak_ptr<Peer> peer,
    std::shared_ptr<protocol::TMLedgerData> const& data)
{
    std::lock_guard sl(mReceivedDataLock);

    if (isDone())
        return false;

    mReceivedData.emplace_back(peer, data);

    if (mReceiveDispatched)
        return false;

    mReceiveDispatched = true;
    return true;
}

/** Process one TMLedgerData
    Returns the number of useful nodes
*/
// VFALCO NOTE, it is not necessary to pass the entire Peer,
//              we can get away with just a Resource::Consumer endpoint.
//
//        TODO Change peer to Consumer
//
int
InboundLedger::processData(
    std::shared_ptr<Peer> peer,
    protocol::TMLedgerData& packet)
{
    if (packet.type() == protocol::liBASE)
    {
        if (packet.nodes().empty())
        {
            JLOG(journal_.warn()) << peer->id() << ": empty header data";
            peer->charge(
                Resource::feeMalformedRequest, "ledger_data empty header");
            return -1;
        }

        SHAMapAddNode san;

        ScopedLockType sl(mtx_);

        try
        {
            if (!mHaveHeader)
            {
                if (!takeHeader(packet.nodes(0).nodedata()))
                {
                    JLOG(journal_.warn()) << "Got invalid header data";
                    peer->charge(
                        Resource::feeMalformedRequest,
                        "ledger_data invalid header");
                    return -1;
                }

                san.incUseful();
            }

            if (!mHaveState && (packet.nodes().size() > 1) &&
                !takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
            {
                JLOG(journal_.warn()) << "Included AS root invalid";
            }

            if (!mHaveTransactions && (packet.nodes().size() > 2) &&
                !takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
            {
                JLOG(journal_.warn()) << "Included TX root invalid";
            }
        }
        catch (std::exception const& ex)
        {
            JLOG(journal_.warn())
                << "Included AS/TX root invalid: " << ex.what();
            using namespace std::string_literals;
            peer->charge(Resource::feeInvalidData, "ledger_data "s + ex.what());
            return -1;
        }

        if (san.isUseful())
            progress_ = true;

        mStats += san;
        return san.getGood();
    }

    if ((packet.type() == protocol::liTX_NODE) ||
        (packet.type() == protocol::liAS_NODE))
    {
        std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: "
                                                                : "liAS_NODE: ";

        if (packet.nodes().empty())
        {
            JLOG(journal_.info()) << peer->id() << ": response with no nodes";
            peer->charge(Resource::feeMalformedRequest, "ledger_data no nodes");
            return -1;
        }

        ScopedLockType sl(mtx_);

        // Verify node IDs and data are complete
        for (auto const& node : packet.nodes())
        {
            if (!node.has_nodeid() || !node.has_nodedata())
            {
                JLOG(journal_.warn()) << "Got bad node";
                peer->charge(
                    Resource::feeMalformedRequest, "ledger_data bad node");
                return -1;
            }
        }

        SHAMapAddNode san;
        receiveNode(packet, san);

        JLOG(journal_.debug())
            << "Ledger "
            << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
            << " node stats: " << san.get();

        if (san.isUseful())
            progress_ = true;

        mStats += san;
        return san.getGood();
    }

    return -1;
}

namespace detail {
// Track the amount of useful data that each peer returns
struct PeerDataCounts
{
    // Map from peer to amount of useful the peer returned
    std::unordered_map<std::shared_ptr<Peer>, int> counts;
    // The largest amount of useful data that any peer returned
    int maxCount = 0;

    // Update the data count for a peer
    void
    update(std::shared_ptr<Peer>&& peer, int dataCount)
    {
        if (dataCount <= 0)
            return;
        maxCount = std::max(maxCount, dataCount);
        auto i = counts.find(peer);
        if (i == counts.end())
        {
            counts.emplace(std::move(peer), dataCount);
            return;
        }
        i->second = std::max(i->second, dataCount);
    }

    // Prune all the peers that didn't return enough data.
    void
    prune()
    {
        // Remove all the peers that didn't return at least half as much data as
        // the best peer
        auto const thresh = maxCount / 2;
        auto i = counts.begin();
        while (i != counts.end())
        {
            if (i->second < thresh)
                i = counts.erase(i);
            else
                ++i;
        }
    }

    // call F with the `peer` parameter with a random sample of at most n values
    // of the counts vector.
    template <class F>
    void
    sampleN(std::size_t n, F&& f)
    {
        if (counts.empty())
            return;

        auto outFunc = [&f](auto&& v) { f(v.first); };
        std::minstd_rand rng{std::random_device{}()};
#if _MSC_VER
        std::vector<std::pair<std::shared_ptr<Peer>, int>> s;
        s.reserve(n);
        std::sample(
            counts.begin(), counts.end(), std::back_inserter(s), n, rng);
        for (auto& v : s)
        {
            outFunc(v);
        }
#else
        std::sample(
            counts.begin(),
            counts.end(),
            boost::make_function_output_iterator(outFunc),
            n,
            rng);
#endif
    }
};
}  // namespace detail

/** Process pending TMLedgerData
    Query the a random sample of the 'best' peers
*/
void
InboundLedger::runData()
{
    // Maximum number of peers to request data from
    constexpr std::size_t maxUsefulPeers = 6;

    decltype(mReceivedData) data;

    // Reserve some memory so the first couple iterations don't reallocate
    data.reserve(8);

    detail::PeerDataCounts dataCounts;

    for (;;)
    {
        data.clear();

        {
            std::lock_guard sl(mReceivedDataLock);

            if (mReceivedData.empty())
            {
                mReceiveDispatched = false;
                break;
            }

            data.swap(mReceivedData);
        }

        for (auto& entry : data)
        {
            if (auto peer = entry.first.lock())
            {
                int count = processData(peer, *(entry.second));
                dataCounts.update(std::move(peer), count);
            }
        }
    }

    // Select a random sample of the peers that gives us the most nodes that are
    // useful
    dataCounts.prune();
    dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
        trigger(peer, TriggerReason::reply);
    });
}

Json::Value
InboundLedger::getJson(int)
{
    Json::Value ret(Json::objectValue);

    ScopedLockType sl(mtx_);

    ret[jss::hash] = to_string(hash_);

    if (complete_)
        ret[jss::complete] = true;

    if (failed_)
        ret[jss::failed] = true;

    if (!complete_ && !failed_)
        ret[jss::peers] = static_cast<int>(mPeerSet->getPeerIds().size());

    ret[jss::have_header] = mHaveHeader;

    if (mHaveHeader)
    {
        ret[jss::have_state] = mHaveState;
        ret[jss::have_transactions] = mHaveTransactions;
    }

    ret[jss::timeouts] = timeouts_;

    if (mHaveHeader && !mHaveState)
    {
        Json::Value hv(Json::arrayValue);
        for (auto const& h : neededStateHashes(16, nullptr))
        {
            hv.append(to_string(h));
        }
        ret[jss::needed_state_hashes] = hv;
    }

    if (mHaveHeader && !mHaveTransactions)
    {
        Json::Value hv(Json::arrayValue);
        for (auto const& h : neededTxHashes(16, nullptr))
        {
            hv.append(to_string(h));
        }
        ret[jss::needed_transaction_hashes] = hv;
    }

    return ret;
}

}  // namespace ripple
